package com.iteaj.iot.client.mqtt;

import cn.hutool.core.util.StrUtil;
import com.iteaj.iot.IotThreadManager;
import com.iteaj.iot.SocketMessage;
import com.iteaj.iot.client.ClientConnectProperties;
import com.iteaj.iot.client.MultiClientManager;
import com.iteaj.iot.client.component.TcpClientComponent;
import com.iteaj.iot.client.mqtt.message.MqttClientMessage;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.mqtt.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 基于mqtt协议的客户端组件
 * @see MqttClient
 * @param <M>
 */
public abstract class MqttClientComponent<M extends MqttClientMessage> extends TcpClientComponent<M> {


    private Logger logger = LoggerFactory.getLogger(MqttClientComponent.class);
    private MessagePublishListener publishListener = MessagePublishListener.LOGGER_LISTENER;

    public MqttClientComponent() {
        this.setInterceptor(MqttDecoderInterceptor.DEFAULT);
    }

    /**
     * @param config 默认客户端配置
     */
    public MqttClientComponent(MqttConnectProperties config) {
        super(config);
        this.setInterceptor(MqttDecoderInterceptor.DEFAULT);
    }

    public MqttClientComponent(MqttConnectProperties config, MultiClientManager clientManager) {
        super(config, clientManager);
        this.setInterceptor(MqttDecoderInterceptor.DEFAULT);
    }

    public MqttClientComponent(ClientConnectProperties config, MultiClientManager clientManager, MessagePublishListener publishListener) {
        super(config, clientManager);
        this.publishListener = publishListener;
        this.setInterceptor(MqttDecoderInterceptor.DEFAULT);
    }

    /**
     * 此处不进行构建
     * @see MqttClient#buildPublishMessage(ChannelHandlerContext, MqttPublishMessage) 移到此处构建
     * @param message
     * @return
     */
    @Override
    public SocketMessage readBuild(SocketMessage message) {
        return message;
    }

    @Override
    public SocketMessage createMessage(byte[] message) {
        throw new UnsupportedOperationException("请使用方法createMessage(MqttPublishMessage)替代");
    }

    @Override
    public MqttClient createNewClient(ClientConnectProperties config) {
        if(!(config instanceof MqttConnectProperties)) {
            throw new MqttClientException("mqtt配置只支持类型["+MqttConnectProperties.class.getSimpleName()+"]");
        }

        return new MqttClient(this, (MqttConnectProperties) config);
    }

    @Override
    public MqttConnectProperties getConfig() {
        return (MqttConnectProperties) super.getConfig();
    }

    @Override
    public MqttClient createNewClientAndConnect(ClientConnectProperties config) {
        return (MqttClient) super.createNewClientAndConnect(config);
    }

    @Override
    public void start(Object config) {
        super.start(config);

        // 定时重发处理
        resendMsgSchedule(IotThreadManager.instance().getWorkerGroup());
    }

    protected void resendMsgSchedule(EventLoopGroup arg) {
        // 十秒处理一次
        arg.scheduleAtFixedRate(() -> {
            try {
                this.clients().forEach(item -> {
                    MqttClient mqttClient = (MqttClient) item;
                    // 对所有还在线的客户端进行报文重发和移除处理
                    if(mqttClient.isConnect()) {
                        mqttClient.getMessageIdManager().expire();
                    }
                });
            } catch (Exception e) {
                logger.error("mqtt客户端重发错误", e);
            }
        }, 10, 10, TimeUnit.SECONDS);
    }

    /**
     * 发送ping请求 默认客户端
     * @return
     */
    public ChannelFuture ping() {
        return ping(getConfig().connectKey());
    }

    /**
     * 发送ping请求
     * @param clientId 如果为空则使用{@link #getConfig()}默认地址
     * @return
     */
    public ChannelFuture ping(String clientId) {
        MqttClient client = getClient(clientId);
        if(client != null) {
            return client.getChannel().writeAndFlush(MqttMessage.PINGREQ);
        } else {
            throw new IllegalStateException("获取不到客户端["+clientId+"]");
        }
    }

    /**
     * 订阅主题  使用默认的客户端
     * @param topic 主题
     * @param qoS 表示服务端向客户端发布消息时可以使用的最大 QoS
     * @return
     */
    public ChannelFuture subscribe(String topic, MqttQoS qoS) {
        return this.subscribe(getConfig().getClientId(), topic, qoS);
    }

    /**
     * 订阅主题
     * @param topic 主题
     * @param qoS 表示服务端向客户端发布消息时可以使用的最大 QoS
     * @param clientId
     * @return
     */
    public ChannelFuture subscribe(String clientId, String topic, MqttQoS qoS) {
        if(StrUtil.isBlank(topic)) {
            throw new IllegalArgumentException("[topic]必填");
        }

        MqttClient client = getClient(clientId);
        if(client != null) {
            // https://gitee.com/iteaj/iot/issues/I5P9ZX
            MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
                    .messageId(client.getMessageIdManager().nextId()).addSubscription(qoS, topic).build();
            return client.writeAndFlush(subscribeMessage);
        } else {
            throw new MqttClientException("获取不到客户端["+clientId+"]");
        }
    }

    /**
     * 取消订阅指定主题(使用默认的客户端)
     * @param topic 主题
     * @return
     */
    public ChannelFuture unsubscribe(String topic) {
        return this.unsubscribe(getConfig().getClientId(), topic);
    }

    /**
     * 取消订阅指定主题
     * @param topic 主题
     * @param clientId 客户端id
     * @return
     */
    public ChannelFuture unsubscribe(String clientId, String topic) {
        if(StrUtil.isBlank(topic)) {
            throw new IllegalArgumentException("[topic]必填");
        }

        MqttClient client = getClient(clientId);
        if(client != null) {
            int messageId = client.getMessageIdManager().nextId();
            return client.writeAndFlush(MqttMessageBuilders.unsubscribe()
                    .messageId(messageId)
                    .addTopicFilter(topic)
                    .build());
        } else {
            throw new MqttClientException("获取不到客户端["+clientId+"]");
        }
    }

    /**
     * 新增报文订阅
     * 连接成功后调用此方法
     * @see MqttQoS 服务端发布给客户端在该topic中最大的qos
     * @param client 当前Mqtt客户端要订阅的配置
     */
    protected abstract List<MqttTopicSubscription> doSubscribe(MqttConnectProperties client);

    @Override
    public MqttClient getClient(Object clientId) {
        return (MqttClient) super.getClient(clientId);
    }

    /**
     * 新增发布报文监听
     * @param publishListener
     * @return
     */
    public MqttClientComponent<M> setPublishListener(MessagePublishListener publishListener) {
        this.publishListener = publishListener;
        return this;
    }

    public MessagePublishListener getPublishListener() {
        return publishListener;
    }

}
